package com.hanxiaozhang.io.makereactor_5;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.*;
import java.util.Iterator;
import java.util.Set;
import java.util.concurrent.LinkedBlockingQueue;

/**
 * 〈一句话功能简述〉<br>
 * 〈多路复用器线程〉
 * 作用：每个线程对应一个selector。在多线程情况下，
 * 该程序的并发客户端被分配到多个selector上。
 * 注意：每个客服端，只绑定其中一个selector。所以，不会有交互问题。
 *
 * @author hanxinghua
 * @create 2021/8/25
 * @since 1.0.0
 */
public class SelectorThread extends ThreadLocal<LinkedBlockingQueue<Channel>> implements Runnable {

    public Selector selector = null;

    //  LinkedBlockingQueue<Channel> lbq = new LinkedBlockingQueue<>()  lbq在接口或者类中是固定使用方式逻辑写死了
    //  你需要是lbq每个线程持有自己的独立对象
    public LinkedBlockingQueue<Channel> lbq = get();

    private SelectorThreadGroup selectorThreadGroup;


    @Override
    protected LinkedBlockingQueue<Channel> initialValue() {
        // 你要丰富的是这里！  pool。。。
        return new LinkedBlockingQueue<>();
    }


    SelectorThread(SelectorThreadGroup selectorThreadGroup) {
        try {
            this.selectorThreadGroup = selectorThreadGroup;
            selector = Selector.open();
        } catch (IOException e) {
            e.printStackTrace();
        }
    }


    @Override
    public void run() {

        // 死循环 Loop
        while (true) {
            try {
                // 1.select   它是阻塞的 --> selector.wakeup()：用于唤醒阻塞在select方法上的线程
                int num = selector.select();
                System.out.println("selector.select()....");

                // 2.处理selectKeys
                if (num > 0) {
                    Set<SelectionKey> keys = selector.selectedKeys();
                    Iterator<SelectionKey> iterator = keys.iterator();
                    // 线性处理过程
                    while (iterator.hasNext()) {
                        SelectionKey key = iterator.next();
                        iterator.remove();
                        // 接受客户端过程，比较复杂（接受之后，要注册，新的客户端，注册到哪里？）
                        if (key.isAcceptable()) {
                            acceptHandler(key);
                        } else if (key.isReadable()) {
                            readHandler(key);
                        } else if (key.isWritable()) {

                        }
                    }

                }
                // 3.处理一些task
                if (!lbq.isEmpty()) {
                    Channel channel = lbq.take();
                    if (channel instanceof ServerSocketChannel) {
                        ServerSocketChannel server = (ServerSocketChannel) channel;
                        server.register(selector, SelectionKey.OP_ACCEPT);
                        System.out.println(Thread.currentThread().getName() + " register listen");
                    } else if (channel instanceof SocketChannel) {
                        SocketChannel client = (SocketChannel) channel;
                        ByteBuffer buffer = ByteBuffer.allocateDirect(4096);
                        client.register(selector, SelectionKey.OP_READ, buffer);
                        System.out.println(Thread.currentThread().getName() + " register client: " + client.getRemoteAddress());
                    }
                }
            } catch (IOException e) {
                e.printStackTrace();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }

    }

    private void readHandler(SelectionKey key) {
        System.out.println(Thread.currentThread().getName() + " readHandler......");
        ByteBuffer buffer = (ByteBuffer) key.attachment();
        SocketChannel client = (SocketChannel) key.channel();
        buffer.clear();
        while (true) {
            try {
                int num = client.read(buffer);
                if (num > 0) {
                    // 将读到内容翻转，然后直接写出
                    buffer.flip();
                    while (buffer.hasRemaining()) {
                        client.write(buffer);
                    }
                    buffer.clear();
                } else if (num == 0) {
                    break;
                } else if (num < 0) {
                    // 客户端断开
                    System.out.println("client： " + client.getRemoteAddress() + " closed!");
                    client.close();
                    break;
                }
            } catch (IOException e) {
                e.printStackTrace();
            }

        }

    }

    private void acceptHandler(SelectionKey key) {
        System.out.println(Thread.currentThread().getName() + " acceptHandler......");

        ServerSocketChannel server = (ServerSocketChannel) key.channel();
        try {
            SocketChannel client = server.accept();
            client.configureBlocking(false);
            // 需要选择一个多路复用器，并注册
            selectorThreadGroup.nextSelector(client);

        } catch (IOException e) {
            e.printStackTrace();
        }


    }

    public void setWorker(SelectorThreadGroup selectorThreadGroup) {
        this.selectorThreadGroup = selectorThreadGroup;
    }


}
